package org.example.service;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.example.protocol.bilibili.packet.*;
import org.example.service.consts.CommonConsts;
import org.example.util.ConfigLoader;
import org.example.util.KafkaUtils;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;

import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Slf4j

public class BiliBiliDanmuClient extends WebSocketClient {
    private Boolean status = true;

    private Integer roomId;

    private String token;

    private String buvid;

    private ScheduledExecutorService scheduler;

    private KafkaProducer<String, String> kafkaProducer;

    private static final String DANMU_TOPIC;

    static {
        DANMU_TOPIC = ConfigLoader.loadProperties("kafka.properties").getProperty("danmu.topic", "bilibili-danmu");
    }


    public BiliBiliDanmuClient(URI serverUri,Integer roomId,String token,String buvid) {
        super(serverUri);
        this.roomId = roomId;
        this.token = token;
        this.buvid = buvid;
        this.scheduler = Executors.newScheduledThreadPool(1);
        this.kafkaProducer = KafkaUtils.getProducerInstance();
    }

    @Override
    public void onOpen(ServerHandshake serverHandshake) {
        log.info("websocket服务器 连接成功,{}",serverHandshake.getHttpStatusMessage());
        //发送认证包
        AuthDTO authDTO = new AuthDTO();
        authDTO.setRoomId(roomId)
                .setKey(token)
                .setBuvid(buvid)
                .setProtover(ProtocolVersion.ZLIB_COMPRESSED_NORMAL.getCode())
                .setType(PacketType.HEARTBEAT.getCode());
        AuthPacket authPacket = new AuthPacket(authDTO,0L);
        this.send(authPacket.buildAuthPacket().getBytes());
        log.info("认证包发送完成");
        //发送心跳包
        this.startHeartBeatTask();
    }

    @Override
    public void onMessage(String s) {
        System.out.println(s);
        log.info("message={}",s);
    }

    @Override
    public void onMessage(ByteBuffer bytes) {
        List<BiliBiliDataPacket> packets = new ArrayList<>();
        try{
            packets = BiliBiliDataPacket.parsePackets(bytes);
        }catch(Exception ex){
            log.error("parse message error,",ex);
        }

        packets.forEach(packet -> {
            // log.debug("protocolVersion={},packetType={}",packet.getProtocolVersion(),packet.getPacketType());
            if(packet.getPacketType() == PacketType.COMMAND){
                JSONObject messageInfo =JSON.parseObject(packet.getContent());
                String cmd = messageInfo.getString("cmd");
                if("DANMU_MSG".equals(cmd) && Objects.nonNull(messageInfo)){
                    // 推送完整的messageInfo到Kafka
                    this.sendToKafka(messageInfo);
                }
            }
        });
    }

    @Override
    public void onClose(int i, String s, boolean b) {
        log.info("websocket 服务已经关闭,{},{},{}",i,s,b);
        status = false;
        // 关闭资源
        closeResources();
    }

    @Override
    public void onError(Exception e) {
        log.error("报错,",e);

    }

    public Boolean getStatus(){
        return this.status;
    }

    private void startHeartBeatTask(){
        long seq = 0L;
        scheduler.scheduleAtFixedRate(()->this.sendHeartBeat(seq),0,30, TimeUnit.SECONDS);
    }

    private void sendHeartBeat(long seq) {
        BiliBiliDataPacket heartBeatPacket = new BiliBiliDataPacket(ProtocolVersion.PLAIN_TEXT_AUTHENTICATION, PacketType.HEARTBEAT,"heart beat packet",seq++);
        this.send(heartBeatPacket.buildPacket());

        log.info("心跳包发送结束,isOpen={}",this.isOpen());
    }

    /**
     * 将弹幕消息发送到Kafka
     * @param messageInfo 弹幕消息JSON对象
     */
    private void sendToKafka(JSONObject messageInfo) {
        try {
            messageInfo.put(CommonConsts.DanMuMessageFieldKey.APP_KEY,CommonConsts.BILIBILI);
            messageInfo.put(CommonConsts.DanMuMessageFieldKey.ROOM_KEY,Integer.toString(this.roomId));
            String messageJson = messageInfo.toJSONString();
            ProducerRecord<String, String> record = new ProducerRecord<>(DANMU_TOPIC, messageJson);

            // 异步发送消息到Kafka
            KafkaUtils.sendMessageAsync(kafkaProducer, record);

            log.debug("弹幕消息已发送到Kafka, topic: {}", DANMU_TOPIC);
        } catch (Exception e) {
            log.error("发送弹幕消息到Kafka失败", e);
        }
    }

    /**
     * 关闭资源
     */
    private void closeResources() {
        try {
            if (scheduler != null && !scheduler.isShutdown()) {
                scheduler.shutdown();
                log.info("定时任务调度器已关闭");
            }
        } catch (Exception e) {
            log.error("关闭定时任务调度器失败", e);
        }
    }
}
